Methodology
Методология dataCraft Core включает в себя разбиение процесса обработки данных на 10 этапов (слоёв):
I. Normalize
II. Incremental
III. Join
IV. Combine
V. Hash
VI. Link
VII. Full
VIII. Graph
IX. Attribution
X. Dataset
Normalize или нормализация - первый слой обработки данных в рамках методологии.
На нём происходит подготовка сырых данных для дальнейшей работы.
Обычно сырые данные из Airbyte передаются в хранилище в формате JSON. Пример данных:
{"Date":"2024-03-18","CampaignId":"11102222","CampaignName":"кампания Х","CampaignType":"TEXT_CAMPAIGN","Cost":"13210260000","Impressions":"1750","Clicks":"158"}
На этапе normalize
происходит преобразование в более удобный формат - данные извлекаются и распределяются по столбцам, формируется таблица с привычной для пользователей структурой.
На этом этапе только извлекаем данные из JSON, никаких специальных преобразований не производится.
Поскольку это самый первый этап обработки данных, то зависит только от таблиц, передаваемых из Airbyte в хранилище данных.
Название файлов модели на этом этапе должно состоять из пяти частей: normalize_{название источника данных}_{название пайплайна}_{название шаблона}_{название стрима}
Ознакомится подробнее с тем, что такое пайплайн, шаблон и стрим в рамках нашей методологии, можно в разделе Terms.
Создаётся один файл на каждую уникальную комбинацию источника данных, шаблона и стрима.
SELECT
-- извлекаем данные из JSON:
JSONExtractString(_airbyte_data, 'event_receive_datetime') AS __date,
JSONExtractString(_airbyte_data, '__clientName') AS __clientName,
JSONExtractString(_airbyte_data, '__productName') AS __productName,
JSONExtractString(_airbyte_data, 'appmetrica_device_id') AS appmetrica_device_id,
JSONExtractString(_airbyte_data, 'city') AS city,
JSONExtractString(_airbyte_data, 'deeplink_url_parameters') AS deeplink_url_parameters,
JSONExtractString(_airbyte_data, 'event_receive_datetime') AS event_receive_datetime,
JSONExtractString(_airbyte_data, 'google_aid') AS google_aid,
JSONExtractString(_airbyte_data, 'ios_ifa') AS ios_ifa,
JSONExtractString(_airbyte_data, 'os_name') AS os_name,
JSONExtractString(_airbyte_data, 'profile_id') AS profile_id,
JSONExtractString(_airbyte_data, 'publisher_name') AS publisher_name,
-- добавляем технические столбцы:
toLowCardinality(_dbt_source_relation) AS __table_name, -- столбец с названием таблицы, содержащей сырые данные
toDateTime32(substring(toString(_airbyte_extracted_at), 1, 19)) AS __emitted_at, -- столбец с датой выгрузки данных
NOW() AS __normalized_at -- когда была проведена нормализация
-- данные берём из таблицы, которую Airbyte передал в хранилище данных:
FROM ((
SELECT
toLowCardinality(
'datacraft_clientname_raw__stream_appmetrica_default_accountid_deeplinks'
) AS _dbt_source_relation,
toString("_airbyte_raw_id") AS _airbyte_raw_id,
toString("_airbyte_data") AS _airbyte_data,
toString("_airbyte_extracted_at") AS _airbyte_extracted_at
FROM airbyte_internal.datacraft_clientname_raw__stream_appmetrica_default_accountid_deeplinks
))
В dbt есть такое понятие как материализация (materialization) - то в каком виде будут сохраняться результаты запроса. В данном случае мы не указываем какой-то специальный тип материализации и данные по умолчанию сохраняются как view
(представление).
Статус
В настоящий момент этап нормализации полностью автоматизирован в dataCraft Core.
Макрос
{{ datacraft.normalize() }}
Incremental или инкрементальный слой обработки данных - второй шаг в рамках нашей методологии.
Обновление данных может выполняться двумя способами: полной перезаписью или добавлением новых данных к уже существующим. Это позволяет эффективно обрабатывать большие объёмы информации и снизить нагрузку на систему при обновлении. Выбор между полным и частичным обновлением зависит от типа пайплайна. Для пайплайнов, где данные группируются по дате и/или пользователю/событию, предпочтительнее частичное обновление — добавление новых данных к существующим. Для пайплайнов, где данные группируются по периоду (например, медиаплны) или представляют собой справочники, подходит полная замена данных.
Поскольку это второй этап обработки данных, то он зависит только от таблиц нормализации, полученных на первом шаге normalize.
Название файлов модели на этом этапе должно состоять из пяти частей: incremental_{название источника данных}_{название пайплайна}_{название шаблона}_{название стрима}
Создаётся один файл, соответственно каждому файлу нормализации.
Для полной замены данных:
{{ config(
materialized='table',
order_by=('__table_name'),
on_schema_change='fail'
)
}}
SELECT * FROM {{ ref('normalize_appmetrica_registry_default_profiles') }}
Для дозаписи или частичного обновления данных:
{{ config(
materialized='incremental',
order_by=('__date', '__table_name'),
incremental_strategy='delete+insert',
unique_key=['__date', '__table_name'],
on_schema_change='fail'
)
}}
SELECT * FROM {{ ref('normalize_appmetrica_events_default_installations') }}
Тип материализации зависит типа пайплайна. В dataCraft Core выделяем 4 вида пайплайнов: Events, Datestat, Periodstat, Registry.
materialized='incremental'
). Инкрементальные модели позволяют dbt вставлять или обновлять записи в таблице с момента последнего запуска этой модели, без необходимости полной перезаписи таблицы.materialized='table
. В таком случае при каждом запуске модели таблица полностью перезаписывается.Статус
В настоящий момент этап инкрементализации полностью автоматизирован в dataCraft Core.
Макрос
{{ datacraft.incremental() }}
Join - третий слой обработки данных в рамках методологии dataCraft Core.
В рамках этого слоя объединяем в одну таблицу все стримы, которые есть для комбинации “источник+пайплайн”, за исключением пайплайнов registry
, относящихся к глобальному подтипу (подробнее см. Pipeline). Например, у AppMetrica
для пайплайна events
- 5 стримов: deeplinks
, events
, install
, screen_view
и sessions_starts
. На этом слое все они объединяются в таблицу join_appmetrica_events
.
Также на этом слое происходит объединение технических registry с основной таблицей источника. Например, у MyTarget
может быть три стрима: 1) banners_statistics
, относящийся к пайпалйну datestat
; 2) banners
и 3) campaigns
, относящиеся к техническим registry
. В этом случае все три стрима объединяются в одну таблицу join_mt_datestat
.
Также на слое Join
выполняется первичная предобработка данных: приведение к нужным типам, извлечение значений с помощью регулярных выражений (если нужно), добавление НДС и других бизнес-правил.
Важно, чтобы поля из разных источников, содержащие одни и те же показатели, во всех таблицах назывались одинаково. Поэтому на этом шаге также переименовываем все поля и приводим к единому стандарту. Для нейминга используем стиль camelCase
. Пример:
Яндекс.Метрика
поле с id кампании называется CampaignId
VK Ads
поле с id кампании называется id
adId
.Ещё одно важное действие на слое Join
- добавление новой колонки __link
, содержащей название линка, к которому относятся данные (подробнее см. Link). Этот столбец понадобится на этапе Hash
.
Способ объединения таблиц зависит от типа источника данных:
AppMetrica
схожие стримы объединяются в одину таблицу с помощью UNION ALL
. Чтобы объединить таблицы таким способом, поля содержащие одни те же показатели, должны называться одинаково, а количество столбцов должно быть равным и они должны располагаться в одинаковом порядке.VK Ads
или MyTarget
, объединяются с помощью JOIN
по общему полю, например, id
кампании.Если у комбинации источник+пайплайн только один стрим или данные относятся к пайплайну Registry
глобального подтипа, то на этом шаге проводится только предобработка данных и приведение названий столбцов к единому стандарту.
Перед переименованием полей в разных источниках необходимо составить справочник стандартных названий.
Зависит от всех таблиц слоя Incremental
, относящихся к одному источнику и пайплайну (исключение объединение с техническими registry
, в этом случае зависит от всех таблиц слоя Incremental
, относящихся к одному источнику и пайплайну, плюс от таблиц Incremental
, относящихся к техническим registry
.
Registry
, название состоит из трёх частей: join_{название источника}_{название пайплайна}
. Registry
название формируется следующим образом: join_{название источника}_{название пайплайна}_{название линка}
Создаётся один файл на каждую комбинацию источник+пайплайн. Исключение пайплайн Registry
глобального подтипа. В этом случае создаётся модель на каждую комбинацию источник+пайплайн+линк.
{{ config(
materialized='incremental',
order_by=('__date', '__table_name'),
incremental_strategy='delete+insert',
unique_key=['__date', '__table_name'],
on_schema_change='fail'
) }}
-- для каждого стрима создаём его CTE с одинаковым набором полей и их расположением
-- первый стрим - deeplinks
WITH join_appmetrica_events_deeplinks AS (
SELECT
toDateTime(__date) AS __date,
toLowCardinality(__table_name) AS __table_name,
toDateTime(event_datetime) AS event_datetime,
toLowCardinality(splitByChar('_', __table_name)[6]) AS accountName,
appmetrica_device_id AS appmetricaDeviceId,
assumeNotNull(COALESCE(nullIf(google_aid, ''), nullIf(ios_ifa, ''), appmetrica_device_id, '')) AS mobileAdsId,
profile_id AS crmUserId,
'' AS visitId,
'' AS clientId,
'' AS promoCode,
os_name AS osName,
city AS cityName,
assumeNotNull(coalesce({{ datacraft.get_adsourcedirty() }}, publisher_name, '')) AS adSourceDirty,
<...>
0 AS installs,
'' AS installationDeviceId,
__emitted_at,
toLowCardinality('AppDeeplinkStat') AS __link
FROM {{ ref('incremental_appmetrica_events_default_deeplinks') }}
)
<...>
-- пятый стрим - sessions_starts
, join_appmetrica_events_sessions_starts AS (
SELECT
<...>
FROM {{ ref('incremental_appmetrica_events_default_sessions_starts') }}
)
-- теперь делаем UNION записанных ранее CTE
, final_union AS (
SELECT *
FROM join_appmetrica_events_deeplinks
UNION ALL
SELECT *
FROM join_appmetrica_events_events
UNION ALL
SELECT *
FROM join_appmetrica_events_install
UNION ALL
SELECT *
FROM join_appmetrica_events_screen_view
UNION ALL
SELECT *
FROM join_appmetrica_events_sessions_starts
)
SELECT *
FROM final_union
Для всех join таблиц тип материализации 'view'
.
Статус
В настоящий момент этап `Join' полностью автоматизирован в dataCraft Core.
Макрос
{{ datacraft.join() }}
Combine или объединение - четвертый слой обработки данных в рамках методологии.
На этом слое происходит объединение всех одинаковых пайплайнов. Позволяет собрать данные их разных источников вместе.
Например, пайплайн 'events'
есть у двух источников - Яндекс.Метрика
и AppMetrica
. Следовательно, данные по событиям из этих двух источников можем объединяем в одну таблицу.
Поскольку на этапе Join мы подготовили наши данные и переименовали одинаково столбцы в разных источника, то объединение сводится к простому UNION ALL
всех таблиц с одинаковым пайплайном. Если каких-то столбцов не хватает в одной из таблиц, то создаётся столбец
'' as utmSource
0 AS sales
Исключение составляют таблицы, относящиеся к пайплайну Registry
глобального подтипа.
Зависит от таблиц слоя Join
c одинаковыми пайплайнами. В случае Registry
глобального подтипа от таблиц слоя Join
с одинаковой комбинацией пайпланй+линк.
Название файлов модели формируется так:
combine_{название пайплайна}
- для всех пайплайнов, кроме Registry
глобального подтипаcombine_{название пайплайна}_{название линка}
- для Registry
глобального подтипаСоздаётся один файл для каждого пайплайна. В случае пайплайна Registry
для каждой комбинации пайплайн+линк.
{{ config(
materialized='incremental',
order_by=('__date', '__table_name'),
incremental_strategy='delete+insert',
unique_key=['__date', '__table_name'],
on_schema_change='fail'
)
}}
WITH all_events AS (
SELECT *
FROM {{ ref('join_appmetrica_events') }}
UNION ALL
SELECT *
FROM {{ ref('join_ym_events') }}
)
SELECT *
FROM all_events
В случае пайпалйна Registry, если существует только одна комбинация пайплайн+линк, запрос сводится к простому селекту:
{{ config(
materialized='table',
order_by=('__table_name'),
on_schema_change='fail'
)
}}
SELECT * FROM {{ ref(join_источник_registry_линк) }}
А если комбинация пайплайн+линк есть в ещё каком-то источнике, то:
{{ config(
materialized='table',
order_by=('__table_name'),
on_schema_change='fail'
)
}}
SELECT * FROM {{ ref(join_источник_registry_линк) }}
UNION ALL
SELECT * FROM {{ ref(join_другойисточник_registry_линк) }}
Если данные относятся к пайплайнам datestat
или events
используем инкрементальный тип материализации: materialized='incremental'
.
В остальных случаях: materialized='table'
.
Статус
В настоящий момент этап combine полностью автоматизирован в dataCraft Core.
.
Макрос
{{ datacraft.combine() }}
Hash или слой хэширования данных - пятый шаг в рамках нашей методологии.
Hash
(хэш) - это уникальный ключ, который рассчитывается как функция бизнес-ключа. Бизнес-ключ, в свою очередь, - это комбинация одного или нескольких полей, по которым можно уникально идентифицировать запись в датасете. В отличии от бизнес-ключа, хэш всегда имеет одинаковую длину и с ним удобнее работать.
На этом слое происходит добавление к данным столбцов, содержащих хэши. Хэши необходимы на следующих этапах работы с данными:
Link
Full
Graph
.Процесс формирования бизнес-ключа и затем хэша включает несколько этапов.
metadata.sql
. В нём описываются все сущности в данных и какие поля уникально их идентифицируют, а также какие сущности к каким Link|линкам относятся. Пример метадаты можно посмотреть тут.MD5
.Схематично получение хэша можно представить следующим образом:
*Каждый прямоугольник на схеме обозначает отдельную функцию.
Специальный макрос с помощью metadata.sql
и колонки __link
отбирает поля, необходимые для формирования бизнес-ключа, производит все необходимые преобразования и формирует поля с хэшами. В методологии dataCraft Core выделяется две группы хэшей:
main_entities
(см. Link ) и являются основными хэшами. По ним проводится дедупликация данных на следующем слое. Для формирования этой группы хэшей в основном макросе используется дополнительный - link_hash()
. Колонка, содержащая эти хэши, называется __id
.glue: yes
. Эти хэши необходимы для графовой склейки.JOIN
между разными пайплайнами. Например, соединить данные по событиям (пайплайн Events
) с каким-то справочником (пайплайн Registry
). В этом случае хэш будет формироваться на основе пересекающихся сущностей, то есть если в одном линке сущность из main_entities
или other_entities
входит main_entities
какого-то другого линка, относящегося к пайплану Registry, то создаётся хэш для данной сущности в табличке hash_{название первого линка}
.Вторая группа - дополнительные хэши, которые не всегда необходимы. Они используются на этапах, которые идут после слоя Link. Для формирования хэшей по сущностям, внутри основного макроса, используется макрос entity_hash()
.
Зависит от таблиц с соответствующим пайплайном, полученных на этапе Combine. Для глобальных Registry
зависит от таблицы с соответствующей комбинацией пайплайн+линк.
Название файлов модели на этом этапе:
hash_{название_пайплайна}
- для всех пайплайнов, кроме Registry
hash_{название_пайплайна}_{линк}
- для Registry
глобального подтипа.Для всех пайплайнов, кроме Registry
, создаётся один файл, соответственно каждому файлу слоя Combine
. Так как на слое Combine
происходит объединение всех данных в рамках одного пайплайна, получается одна модель Hash
на каждый пайплайн.
Для пайплайна Registry
создаются модели для всех комбинаций пайплайн+линк.
(в упрощённом виде и без использования основного макроса)
{{ config(
materialized='table',
order_by=('__table_name'),
on_schema_change='fail'
)
}}
{% set metadata = fromyaml(metadata()) %}
SELECT
*,
{{ link_hash('MediaplanStat', metadata) }},
{{ entity_hash('UtmHash', metadata) }}
FROM {{ ref('combine_periodstat') }}
Зависит от типа пайплайна:
Events
и Datestat
- инкрементальный тип. Инкрементальность идет по тем же полям, что и на слое Incremental
- по дате.Periodstat
, Registr
y инрементальности нет, в этом случае данные материализуются как 'table'
Статус
В настоящий момент в dataCraft Core этап хэширования полностью автоматизирован.
Макрос
{{ datacraft.hash() }}
Link - шестой слой обработки данных. Если дополнительных преобразований не требуется, то данный слой является последним.
На этом слое происходит дедупликация данных. В link-таблицы попадают только уникальные строки.
Дедупликация проводится c помощью хэшей, которые были получены на прошлом слое. Принцип сохранения строк - оставляем уникальные строки:
__date
для пайплайнов с инкрементальным типом материализации (Events
и Datestat
)Registry
и Periodstat
) Данные группируются по значениям хэшей и выбирается максимальное значение для нечисловых полей и сумма (или другая агрегация) для численных полей.Зависит от таблиц с соответствующим пайплайном, полученных на слое Hash
. Для пайплайна Registry
- от с сответствующей комбинацией пайплайн+линк.
Registry
, название моделей на этом слое формируется из двух частей: link_{название пайплайна}
.Registry
глобального подтипа: link_{название_пайплайна}_{линк}
.Создаётся по одной модели на каждую модель слоя Hash
.
(без использования макроса)
{{ config(
materialized='table'
)
}}
SELECT MAX(accountName) AS accountName,
<...>
SUM(adCost) AS adCost,
SUM(impressions) AS impressions,
SUM(clicks) AS clicks
FROM {{ ref('hash_datestat') }}
GROUP BY __id
Правило материализации аналогично предыдущему слою:
Events
и Datestat
, используется инкрементализация: materialized='incremental'
materialized='table'
Статус
В настоящий момент в dataCraft Core этап дедупликации полностью автоматизирован.
Макрос
{{ datacraft.link() }}
Данный слой необходим для объединения данных со справочниками или данных из разных пайплайнов в единую таблицу.
Помимо этого, в зависимости от задачи, на этом слое можно добавлять поля или проводить дополнительную обработку строк.
Если никаких объединений и преобразований не производится, то этот слой всё равно реализуется, чтобы настроить материализацию.
Преобразования и с чем объединяются link-таблици зависит от пайплайна:
Events
- производится JOIN
со всеми доступными справочниками (пайплайн Registry
) и добавляться поле qid
из графовых таблиц (см. VIII. Graph).Datestat
- только JOIN
со справочниками (пайплайн Registry
)Periodstat
- приджойниваться Registry
и данные разбиваются по датам между первой и последней датой периодаДля джойна используются хэши, полученные на слое Hash
по сущностям (см. V. Hash). При объединении, обычно, левой таблицей является линк-таблица, а справочник - правой, и используется LEFT JOIN
. Это необходимо для того, чтобы не потерять данные из основной таблицы в случае, если в справочнике нет какой-то информации, которая есть в основной линк-таблице.
Для пайплайна Registry
full-таблицы не создаются.
Зависит от таблиц, полученных на слое Link
. Исключение пайплайн Events
. Он также зависит от таблицы graph_qid
, получаемой на слое Graph
.
Название файлов модели на этом этапе состоит из двух частей: full_{название пайплайн}
Создаётся один файл для каждого пайплайна. Исключение таблицы-справочник пайплайна Registry
. Для них отдельный файл не создаётся, так как они присоединяются к основным данным из других пайплайнов.
Если никаких преобразований и объединений не производим, то запрос сводится к простому селекту:
{{
config(
materialized = 'table',
order_by = ('__datetime')
)
}}
SELECT
*
FROM {{ ref('link_events') }}
В случае с объединением со справочником запрос выглядел бы так:
{{
config(
materialized = 'table',
order_by = ('__datetime')
)
}}
WITH link_events AS (SELECT *
FROM {{ ref('link_events') }}),
link_registry AS (SELECT *
FROM {{ ref('link_registry') }})
SELECT * FROM link_events as t1
LEFT JOIN link_registry as t2 USING(utmHashHash)
Для всех пайплайнов, кроме Datestat
, материализация на этом слое - 'table'
. Для Datestat
используем инкрементальный тип материализации: materialized='incremental'
.
Статус
Этап Full
полностью автоматизирован.
Макрос
{{ datacraft.full() }}
Следующие два слоя graph
и attribution
в основном используются для маркетинговой аналитики.
Graph
или слой графовой склейки.
На слое Link мы очистили данные от явных дубликатов, но проблема дублирования может возникать и на более высоком уровне. Данные могут иметь аккуратные однотипные идентификаторы, но при этом одному и тому же реальному объекту может соответствовать несколько бизнес-ключей. Рассмотрим пример с данными из CRM-системы. Может получится так, что на одного и того же клиента заведено две (или более) карточки. Такое может произойти, если, например, у клиента несколько номеров телефона. Обычно, если в компанию поступает звонок от клиента с неизвестного номера, у него спрашивают, обращался ли он ранее. Если он говорит, что обращался, в имеющуюся карточку добавляют новый номер. Если нет, то создают новую карточку. Клиент не обязан отвечать правду и помнить свои прошлые обращения. Возникает дублирование данных по одному и тому же клиенту.
Слой Graph
нужен для того, чтобы объединить все данные по одному пользователю/объекту, у которого есть различные идентификаторы, как, например, в примере с разными номерам телефона, но нет фиксированного id
.
Для объединения всех данных по одному объекту используется графовая склейка. Граф - это множество вершин (узлов, node) и ребер (взаимосвязей, edge). Если вернуться к примеру с номерами телефонов и карточками в CRM-системе, карточки и номера телефонов будут является вершинами (узлами), а связи между ними - рёбрами.
Упрощённо алгоритм графовой склейки для нахождения дублей можно описать так:
Реализация графовой склейки в dataCraft Core разбита на 6 подэтапов. Каждый подэтап реализуется с помощью отдельного макроса:
graph_tuples
: задача этого макроса составить на основе данных из link-таблицы и metadata.sql
таблицу соответствия из двух колонок. В первой колонке hash
содержится кортеж (tuple
) из трех элементов:
__link
) (например, AppInstallStat
)__id
)node_left
- данные по которым можно идентифицировать пользователя. Эта колонка также является кортежем из трёх элементов:crmUserHash
(в файле metadata.sql
сущности, по которым можно идентифицировать пользователя, имеют метку glue: yes
)toDateTime(0)
)crmUserHash
)Рассмотрим суть этой таблицы на примере: для AppInstallStat
главный хэш рассчитается по
Account
AppMetricaDevice
MobileAdsId
CrmUser
OsName
City
AdSource
UtmParams
UtmHash
Может быть, например, так, что у одного CrmUser
несколько AppMetricaDevice
, например три, тогда для этого пользователя будет три разных главных хэша __id
. В этом случае в колонке node_left
будет три строчки с одинаковым содержанием и им будет соответствовать три строчки в колонке hash
c разными кортежами.
В matadata.sql
мы описываем модель склейки - какие колонки нужны для неё. В таблице, которую получаем с помощью этого макроса, содержатся все возможные комбинации между содержимым этих колонок.
graph_lookup
: этот макрос формирует список уникальных hash
и node_left
и присваивает каждому номер - key_number
. Необходимо для экономии памяти, так как содержимое колонок таблицы подслоя graph_tuples
занимает довольно большой объём. То есть создаётся своего рада справочник. Далее, вместо hash
и node_left
, будем использовать их номера в качестве идентификатора. Уникальные значения hash
и node_left
записываются в колонку kay_hash
.
graph_unique
: содержит те же данные, что и подслой graph_lookup
. Отличие - порядок сортировки. В graph_lookup
в таблицу записываем данные отсортированные по возрастанию key_number
, а в graph_unique
- по kay_hash
в алфавитном порядке. Необходимо для удобства дальнейшей обработки.
graph_edge
: заменяем в таблице graph_tuples
кортежи (таплы) на key_number
с помощью джойнов. Переименовываем колонки: hash
и node_left
называем node_id_right
и node_id_left
соответственно. Плюс добавляем дополнительные колонки:
group_id
(дублирует колонку node_id_left
)has_changed
- содержит единицу, служебное полеpost_hook
, меняем местами содержимое колонок node_id_right
и node_id_left
.graph_glue
: с помощью этого макроса и дополнительного calc_graph
реализуется склейка. Представляет собой цикл, в котором последовательно джойним таблицу, которую получили на подэтапе graph_eage
, саму с собой сначала используя в качестве ключа node_id_left
, а затем node_id_right
. Как это происходит:
graph_eage
по node_id_left
и вычисляем минимальный id группы: min(group_id) as min_group_id
. (помним, что мы поменяли местами содержимое столбцов node_id_left
и node_id_right
на прошлом шаге, а столбец group_id
содержит значения исходного node_id_left
).graph_eage
по node_id_left
.node_id_right
has_changed
не станут равны нулю или цикл выполнится 14 раз (ограничение, которое устанавливаем самостоятельно. Опытным путём было установлено, что 14 итераций достаточно, чтобы найти все связи между узлами графа. Необходимость устанавливать определённое количество итераций для цикла обусловлено тем, что в dbt нельзя делать бесконечные циклы). Важно отметить, что после каждой итерации данные в таблице graph_eage
обновляются.В итоге все идентификаторы, которые относятся к одному пользователю получают один group_id
.
Схематично действие этого макроса можно представить следующим образом:
Источник
После срабатывания макроса calc_glue
, с помощью макроса graph_glue
получаем уникальный идентификатор пользователя qid
. Для этого группируем таблицу graph_edge
по node_id_left
и отбираем min(group_id) as qid
.
graph_qid
: с помощью этого макроса формируем таблицу-справочник, в которой каждому уникальному ключу, состоящему из (__link, __datetime, __id)
, соответствует уникальный идентификатор пользователя qid
из ранее созданной таблицы graph_glue
. Эту таблицу получаем путём объединения таблицы graph_glue
с таблицей graph_lookup
.
Последовательное выполнение этих макросов позволяет пошагово преобразовать исходные данные в графовую структуру и вычислить идентификаторы групп для каждого узла.
Подэтап graph_tuples
зависит от таблиц, полученных на слое Link
. Какие именно link-таблицы используются указывается в описании моделей графовой склейки glue_models
в metadata.sql
. Далее, каждая последующая модель зависит от результата выполнения предыдущей модели слоя.
Для слоя Graph
создаётся 6 моделей, которые выполняются последовательно.
Название моделей соответствует названиям макросав, которые их формируют:
graph_tuples.sql
graph_lookup.sql
graph_unique.sql
graph_edge.sql
graph_glue.sql
graph_qid.sql
Создаётся по одному файлу модели на каждый подэтап слоя.
(без макросов)
graph_tuples
{{ config(
materialized='table',
on_schema_change='fail'
)
}}
select
tuple(toLowCardinality(__link), __datetime, __id) as hash,
tuple(toLowCardinality('CrmUserHash'), toDateTime(0), CrmUserHash) as node_left
from {{ ref('link_events') }}
where nullIf(CrmUserHash, '') is not null
union all
select
tuple(toLowCardinality(__link), __datetime, __id) as hash,
tuple(toLowCardinality('YmClientHash'), toDateTime(0), YmClientHash) as node_left
from {{ ref('link_events') }}
where nullIf(YmClientHash, '') is not null
union all
<...>
И так для всех сущностей, помеченных в metadate.sql
меткой glue: yes
graph_lookup
{{
config(
materialized='table',
order_by=('key_number')
)
}}
with all_keys as
(
select distinct hash as key_hash from {{ ref('graph_tuples') }}
union distinct select distinct node_left as key_hash from {{ ref('graph_tuples') }}
)
select *, row_number() over() as key_number from all_keys
graph_unique
{{
config(
materialized='table',
order_by=('key_hash')
)
}}
select * from {{ ref('graph_lookup') }}
graph_edge
{{
config(
materialized='table',
post_hook= {
'sql': 'insert into {{target.schema}}.graph_edge(node_id_left, node_id_right, group_id, has_changed)
select
node_id_right,
node_id_left,
group_id,
has_changed
from {{target.schema}}.graph_edge;'
}
)
}}
with join_left as (
select key_number as node_id_left,
node_left
from {{ ref('graph_tuples') }} x
join {{ ref('graph_unique') }} y on x.hash = y.key_hash
)
select node_id_left,
key_number as node_id_right,
node_id_left as group_id,
1 as has_changed
from join_left x
join {{ ref(graph_unique) }} y on x.node_left = y.key_hash
graph_glue
{{
config(
materialized='table',
order_by=('node_id_left'),
pre_hook="{{ datacraft.calc_graph() }}"
)
}}
select
node_id_left,
min(group_id) as qid
from {{ ref('graph_edge') }}
group by node_id_left
Код макроса `datacraft.calc_graph()` выглядит следующим образом:
{% macro calc_graph() %}
{# Запрос для обновления правой таблицы #}
{% set right_query %}
create or replace table {{ target.schema }}.graph_right engine=Log() as
with
min_group_id as (
select
node_id_left,
min(group_id) as min_group_id
from {{ target.schema }}.graph_edge
group by node_id_left
)
select
node_id_left,
node_id_right,
min_group_id as group_id,
min_group_id != e.group_id as has_changed
from {{ target.schema }}.graph_edge e
join min_group_id r on r.node_id_left = e.node_id_left
{% endset %}
{# Запрос для обновления левой таблицы #}
{% set left_query %}
create or replace table {{ target.schema }}.graph_edge engine=Log() as
with
min_group_id as (
select
node_id_right,
min(group_id) as min_group_id
from {{ target.schema }}.graph_right
group by node_id_right
)
select
node_id_left,
node_id_right,
min_group_id as group_id,
min_group_id != e.group_id as has_changed
from {{ target.schema }}.graph_right e
join min_group_id r on r.node_id_right = e.node_id_right
{% endset %}
{# Запрос для проверки наличия изменений #}
{% set check_changed %}
select
max(has_changed)
from {{ target.schema }}.graph_edge
{% endset %}
{# Если необходимо выполнить запросы #}
{% if execute %}
{% set ns = namespace(check_change=1) %}
{% for i in range(0, 14) %}
{{ log("Running iteration " ~ i) }}
{{ check_change }}
{# Проверяем, были ли изменения #}
{% if ns.check_change == 1 %}
{# Обновляем правую таблицу #}
{% do run_query(right_query) %}
{# Обновляем левую таблицу #}
{% do run_query(left_query) %}
{# Проверяем наличие изменений в данных #}
{% set ns.check_change = run_query(check_changed).rows[0][0] %}
{{ log('VALUE: ' ~ ns.check_change) }}
{% endif %}
{% endfor %}
{% endif %}
{% endmacro %}
graph_qid
{{
config(
materialized='table',
order_by=('__datetime', '__link', '__id'),
pre_hook="{{ datacraft.calc_graph() }}"
)
}}
select
toLowCardinality(tupleElement(key_hash, 1)) as __link,
tupleElement(key_hash, 2) as __datetime,
tupleElement(key_hash, 3) as __id,
qid
from {{ ref('graph_glue') }}
join {{ ref('graph_lookup') }} on key_number = node_id_left
Для всех пайплайнов и на всех подшага данные материализуются как таблица - materialized='table'
.
Статус
В dataCraft Core слой графовой склейки полностью автоматизирован.
Макросы
datacraft.graph_tuples()
datacraft.graph_lookup()
datacraft.graph_unique()
datacraft.graph_edge()
datacraft.graph_glue()
+ datacraft.calc_glue()
datacraft.graph_qid()
+ datacraft.calc_glue()
Attribution или шаг атрибуции.
Атрибуция - это приписывание ценности за конверсию источникам трафика. Перед тем, как совершить конверсию (звонок, заказ, покупку), пользователь обычно совершает какие-то взаимодействия с компанией, например заходит на сайт. Эти взаимодействия называются событиями. У части событий мы можем определить маркетинговый источник, благодаря которому они произошли.
Чтобы оценить эффективность вложений в источник, необходимо знать сколько конверсий он принёс. Но не всегда можно однозначно определить источник. Чаще всего пользователь совершает несколько контактов с продуктом, прежде чем совершить покупку или другое целевое действие. Чтобы определить "главный" источник, принёсший конверсию, используют различные модели атрибуции. Модель атрибуции представляет собой правило или набор правил, по которым источнику присваивается ценность (степень его вклада в конверсию).
В зависимости от того, как именно мы распределим пользователей по источникам, мы получим разную картину происходящего. И сделаем разные выводы по эффективности источников трафика.
В dataCraft Core на слое Attribution
как раз и происходит определение главного, согласно выбранной модели атрибуции, источника трафика и присвоение событиям (строкам в таблице) параметров относящихся к этому источнику.
Слой Attribution
состоит из 9 последовательных подэтапов. Для каждого подэтапа разработан свой макрос:
attr_prepare_with_qid
: в этом макросе с помощью LEFT JOIN
объединяем full-таблицу с таблицей graph_qid
. Это необходимо, чтобы добавить к ней уникальный идентификатор пользователя qid
.
attr_create_events
: на этом подэтапе определяем шаг воронки для каждой строки. Это реализуется с помощью конструкции CASE WHEN
. Для каждой строчки производится сопоставление значений из таблицы attr_prepare_with_qid
c описанием шагов воронки в конфиге events. В этом конфиге мы указываем: название шага, к какому линку он относится и какое условие должно выполнятся, чтобы строку в attr_prepare_with_qid
можно было отнести к этому шагу. Если условия, прописанные в CASE WHEN
, выполняются, то строке присваивается порядковый номер шага воронки. Если событие не соответствует ни одному из шагов воронки, ему присваивается 0. Столбец с этими номерам получает название __priority
. Также для каждой строчки указывается название шага. Эта колонка получает название __step
.
На этом шаге оставляем в таблице не все имеющиеся данные, а только qid
, __link
, __priority
, __id
, __datetime
, и __step
.
attr_add_row_number
: на этом подэтапе добавляем к таблице, полученной на attr_create_events
, порядковый номер строки __rn
с помощью оконной функции, разбивая данные по qid
, и сортируем данные по __datetime
, __priority
, __id
.
attr_find_new_period
: с помощью этого макроса определяем когда у пользователя начался новый [период активности](# “это заранее определённый период взаимодействия клиента с продуктом. Если интервал между каждым взаимодействием (событием) меньше установленного периода, то эти действия клиента относятся к одному периоду активности, а если больше, то к разным. Период активности зависит от специфики бизнеса и устанавливается индивидуально.”). Как это происходит: в конфиге attributions, в разделе funnel_steps
содержится максимальный период (обычно в днях), который может быть между двумя последовательными событиями воронки. С помощью оконной функции для каждого пользователя вычисляем временную разницу между событиями. Если эта разность меньше установленного периода, то события считаются принадлежащими к одному периоду активности и в новую колонку __is_new_period
записывается FALSE
, а если разность больше, то событие является началом нового периода активности и в колонке __is_new_period
записывается TRUE
. Если период в metadata.sql
не указан, то за период активности принимается период в 90 дней.
attr_calculate_period_number
: присваиваем каждому периоду свой порядковый номер __period_number
. Все события (все строки) в рамках одного периода активности будут иметь одинаковый порядковый номер. Реализуется за счёт оконной функции с партицией по qid
и сортировкой по __rn
. Каждый раз, когда в рамках одного qid
встречается TRUE
в колонке __is_new_period
, порядковый номер периода увеличивается на единицу.
attr_create_missed_steps
: создаёт пропущенные шаги воронки. Это необходимо для упрощения следующих шагов, на которых уже непосредственно будет выполняться атрибуция. Добавляем пропущенные шаги только в том случае, если они были пропущены в рамках всей пользовательской истории взаимодействия с продуктом, а не в какой-то конкретный период активности. Например, в первый период активности пользователь прошёл шаги №1 и №2, потом долго (более установленного периода) не взаимодействовал с продуктом и в новый период активности сразу зашёл на шаг №3 и дальше пошёл по воронке. В таком случае для него не будут создаваться пропущенные шаги №1 и №2, так как он уже на них когда-то был. И наоборот, если пользователь был на всех шагах воронки со второго по последний, а на первом никогда не был, то для него создаётся фиктивный первый шаг с пометкой TRUE
в специальной колонке __if_missed
.
Как это работает: для каждого qid
находим максимальный шаг max(__priority)
. Потом создаём дополнительную табличку generate_all_priorities
со всеми шагами от первого до этого максимального с помощью функций range
и arrayJoin
. Затем джойним её с основной таблицей, содержащей только реальные данные. Таким образом получаем таблицу, в которой появляются строки с пропущенными шагами и мы можем их чем-то заполнить, а в служебной колонке __if_missed
пометить, что это пропущенные шаги.
Принцип заполнения строк для пропущенных шагов: заполняем ближайшим не нулевым значением в рамках одного qid
, отсортированного по номеру шага, т.е. данными первого не пустого шага. Заполняем таким образом строки в колонках: __id
, __datetime
и __period_number
.
Также на этом шаге добавляем новый номер строк __rn
.
attr_join_to_attr_prepare_with_qid
: после шага attr_create_events
мы работали с таблицами, содержащими только основные данные. На этом шаге объединяем attr_prepare_with_qid
и attr_create_missed_steps
, чтобы снова располагать полными набором полей.
Также на этом шаге производится вычисление ранга для каждой модели в зависимости от ее приоритета. Список названий приоритетов для каждой модели указаны в конфиге attributions, а сами правила для каждого приоритета расписаны в конфиге event_segments. Помимо этого на этом шаге формируем новую колонку с источником трафика: для каждого пропущенного события указываем специальные значения, например 'Без веб сессии', 'Без установки'. Что конкретно указывать зависит от приоритета и того, что указано в конфиге events, содержащим описание шагов. Для реальных данных указываем просто их источник.
attr_model
: на прошлом шаге мы вычисляли ранг для каждой строчки, теперь с помощью оконной функции в рамках одного qid
и одного периода активности __period_number
присваиваем всем строчкам максимальный ранг (наиболее важный). Это нужно для подсчета числа целевых моделей (моделей с максимальным приоритетом) для каждого пользователя и периода. Каждый раз, когда ранг равен максимальному рангу, в вспомогательную колонку __{model_type}__rank_condition
записывается TRUE
. Затем подсчитывается сколько было TRUE
значений в рамках qid
и __period_number
. Благодаря этим подсчётам получаем поле __{model_type}_target_count
. Оно необходимо, чтобы разбить наши периоды на своеобразные подпериоды (модели) и внутри этих подпериодов для определённых полей (которые перечислены в конфиге attributions в разделе attributable_parameters
) создать новые колонки, которые будут содержать во всех строчках значение из первой строчки подпериода. То есть каждому событию будут присвоен данные соответствующие главному источнику трафика (главному согласно выбранной модели атрибуции).
attr_final_table
: это последний шаг слоя Attribution
. На нём просто объединяем в одну таблицу результаты двух предыдущих макросов: attr_join_to_attr_prepare_with_qid
и attr_model
.
Первый подэтап attr_prepare_with_qid
зависит от таблиц, полученных на слое Full
, и от финальной таблицы слоя Graph
- graph_qid
. Далее, каждая последующая модель зависит от результата выполнения предыдущей модели слоя (или нескольких):
attr_create_events
от 1.attr_prepare_with_qid
attr_add_row_number
от 2.attr_create_events
attr_find_new_period
от 3.attr_add_row_number
attr_calculate_period_number
от attr_find_new_period
attr_create_missed_steps
от 5.attr_calculate_period_number
attr_join_to_attr_prepare_with_qid
от 6.attr_create_missed_steps
и 1.attr_prepare_with_qid
attr_model
от 7.attr_join_to_attr_prepare_with_qid
attr_final_table
от 7.attr_join_to_attr_prepare_with_qid
и 8.attr_model
Чтобы сформировать название модели, к названию макроса добавляем название модели:
attr_{название модели атрибуции}_prepare_with_qid
attr_{название модели атрибуции}_create_events
attr_{название модели атрибуции}_add_row_number
attr_{название модели атрибуции}_find_new_period
attr_{название модели атрибуции}_calculate_period_number
attr_{название модели атрибуции}_create_missed_steps
attr_{название модели атрибуции}_join_to_attr_prepare_with_qid
attr_{название модели атрибуции}_model
attr_{название модели атрибуции}_final_table
ВАЖНО! Название модели атрибуции, которое вы указываете в названии моделей, должно совпадать с названием модели атрибуции, которое указано в конфиге attributions. От этого зависит корректная работа макросов.
На каждом подэтапе создаётся столько моделей, сколько моделей вы хотите исследовать и задали. Например, если у вас только одна модель атрибуции, то на каждом подэтапе создаётся по одной модели, если две, то две модели и так далее.
(приводятся примеры запросов как бы они выглядели без использования макросов)
attr_prepare_with_qid
:{{
config(
materialized='table',
order_by=('qid', '__datetime','__link','__id')
)
}}
select
y.qid, x.*
from {{ ref('full_events') }} as x
left join {{ ref('graph_qid') }} as y
using (__datetime,__link, __id)
attr_create_events
{{
config(
materialized='table',
order_by=('qid', '__datetime','__link','__id')
)
}}
select
qid,
__link,
CASE
WHEN __link = 'VisitStat' and osName = 'web' THEN 1
WHEN __link = 'AppInstallStat' and installs >= 1 THEN 2
WHEN __link = 'AppSessionStat' and sessions >= 1 THEN 3
WHEN __link = 'AppDeeplinkStat' THEN 3
WHEN __link = 'AppEventStat' and screenView >= 1 THEN 4
ELSE 0
END as __priority,
__id,
__datetime,
toLowCardinality(CASE
WHEN __link = 'VisitStat' THEN 'visits_step'
WHEN __link = 'AppInstallStat' THEN 'install_step'
WHEN __link = 'AppSessionStat' THEN 'app_visits_step'
WHEN __link = 'AppDeeplinkStat' THEN 'app_visits_step'
WHEN __link = 'AppEventStat' THEN 'event_step'
END) as __step
from {{ ref('attr_myfirstfunnel_prepare_with_qid') }}
attr_add_row_number
{{
config(
materialized='table',
order_by=('qid', '__datetime', '__link', '__id')
)
}}
select
*,
row_number() over (partition by qid order by __datetime, __priority, __id) AS __rn
from {{ ref('attr_myfirstfunnel_create_events') }}
attr_find_new_period
{{
config(
materialized='table',
order_by=('qid', '__datetime', '__link', '__id')
)
}}
with prep_new_period as (
select *,
max(case when __priority in [1, 2, 3, 4] then __datetime else null end)
over (partition by qid order by __rn rows between unbounded preceding and 1 preceding) as prep_new_period
from {{ ref('attr_myfirstfunnel_add_row_number') }}
)
select
qid,
__link,
__priority,
__id,
__datetime,
__rn,
__step,
CASE
WHEN __link = 'VisitStat' and toDate(__datetime) - toDate(prep_new_period) < 90 THEN false
WHEN __link = 'AppInstallStat' and toDate(__datetime) - toDate(prep_new_period) < 30 THEN false
WHEN __link = 'AppSessionStat' and toDate(__datetime) - toDate(prep_new_period) < 30 THEN false
WHEN __link = 'AppDeeplinkStat' and toDate(__datetime) - toDate(prep_new_period) < 30 THEN false
WHEN __link = 'AppEventStat' and toDate(__datetime) - toDate(prep_new_period) < 7 THEN false
ELSE true
END as __is_new_period
from prep_new_period
attr_calculate_period_number
{{
config(
materialized='table',
order_by=('qid', '__datetime', '__link', '__id')
)
}}
select *,
sum(toInt32(__is_new_period)) over (partition by qid order by __rn) AS __period_number
from {{ ref('attr_myfirstfunnel_find_new_period') }}
attr_create_missed_steps
{{
config(
materialized='table',
order_by=('qid', '__datetime','__link','__id')
)
}}
with calc_max_priority as (
select
qid,
__link,
__id,
__datetime,
__rn,
__priority,
__period_number,
__step,
max(__priority) over(partition by qid) as max_priority
from {{ ref('attr_myfirstfunnel_calculate_period_number') }}
),
generate_all_priorities as (
select
distinct qid, __link,
arrayJoin(range(1, assumeNotNull(max_priority) + 1)) as gen_priority
from calc_max_priority
),
final as (
select
first_value(__id) OVER (PARTITION BY qid ORDER BY gen_priority ROWS BETWEEN current row AND UNBOUNDED FOLLOWING ) as __id,
gen_priority as __priority,
qid, __link,
first_value(__datetime) OVER (PARTITION BY qid ORDER BY gen_priority ROWS BETWEEN current row AND UNBOUNDED FOLLOWING ) as __datetime,
first_value(__period_number) OVER (PARTITION BY qid ORDER BY gen_priority ROWS BETWEEN current row AND UNBOUNDED FOLLOWING ) as __period_number,
case when calc_max_priority.qid = 0 then true else false end as __if_missed,
__step
from generate_all_priorities
left join calc_max_priority
on generate_all_priorities.qid = calc_max_priority.qid and
generate_all_priorities.gen_priority = calc_max_priority.__priority
)
select
qid, __link, __id,
__priority, __datetime,
__period_number,
__if_missed,__step,
row_number() over (partition by qid order by __datetime, __priority, __id) AS __rn
from final
attr_join_to_attr_prepare_with_qid
{{
config(
materialized='table',
order_by=('qid', '__period_number', '__datetime', '__priority', '__id')
)
}}
select
y.__period_number as __period_number,
y.__if_missed as __if_missed,
y.__priority as __priority,
y.__step as __step,
x.*EXCEPT(adSourceDirty),
CASE
WHEN LENGTH (adSourceDirty) < 2 THEN 1
WHEN match(adSourceDirty, 'Органическая установка') THEN 2
WHEN __priority = 4 and not __if_missed = 1 THEN 3
WHEN __priority = 3 and not __if_missed = 1 THEN 4
WHEN __priority = 2 and not __if_missed = 1 THEN 5
WHEN __priority = 1 and not __if_missed = 1 THEN 6
ELSE 0
END as __last_click_rank,
CASE
WHEN __priority = 3 and not __if_missed = 1 THEN 1
WHEN __priority = 2 and not __if_missed = 1 THEN 2
WHEN __priority = 1 and not __if_missed = 1 THEN 3
ELSE 0
END as __first_click_rank,
CASE
WHEN __if_missed and __priority = 1 THEN '[Без веб сессии]'
WHEN __if_missed and __priority = 2 THEN '[Без установки]'
WHEN __if_missed and __priority = 3 THEN '[Без апп сессии]'
WHEN __if_missed and __priority = 4 THEN ''
ELSE adSourceDirty
END as adSourceDirty
from {{ ref('attr_myfirstfunnel_prepare_with_qid') }} AS x
join {{ ref('attr_myfirstfunnel_create_missed_steps') }} AS y
using (qid, __datetime, __link, __id)
attr_model
{{
config(
materialized='table',
order_by = ('qid', '__datetime', '__id')
)
}}
with
max_last_click_rank as (
select *
,max(__last_click_rank) over(partition by qid, __period_number order by __datetime, __priority, __id) as __max_last_click_rank
,max(__first_click_rank) over(partition by qid, __period_number order by __datetime, __priority, __id) as __max_first_click_rank
from {{ ref('attr_myfirstfunnel_join_to_attr_prepare_with_qid') }}
),
target_count as (
select *
, __last_click_rank = __max_last_click_rank as __last_click__rank_condition
, sum(case when __last_click__rank_condition then 1 else 0 end) over(partition
by qid, __period_number order by __datetime, __priority, __id) as
__last_click__target_count
, __first_click_rank = __max_first_click_rank as __first_click__rank_condition
, sum(case when __first_click__rank_condition then 1 else 0 end) over(partition
by qid, __period_number order by __datetime, __priority, __id) as
__first_click__target_count
from max_last_click_rank
)
SELECT qid, __datetime, __id, __priority,`__if_missed`, __link, __period_number
, first_value(utmSource) over(partition by qid, __period_number, __last_click__target_count order by __datetime, __priority, __id) as __myfirstfunnel_last_click_utmSource
, first_value(utmMedium) over(partition by qid, __period_number, __last_click__target_count order by __datetime, __priority, __id) as __myfirstfunnel_last_click_utmMedium
, first_value(utmCampaign) over(partition by qid, __period_number, __last_click__target_count order by __datetime, __priority, __id) as __myfirstfunnel_last_click_utmCampaign
, first_value(utmTerm) over(partition by qid, __period_number, __last_click__target_count order by __datetime, __priority, __id) as __myfirstfunnel_last_click_utmTerm
, first_value(utmContent) over(partition by qid, __period_number, __last_click__target_count order by __datetime, __priority, __id) as __myfirstfunnel_last_click_utmContent
, first_value(adSourceDirty) over(partition by qid, __period_number, __last_click__target_count order by __datetime, __priority, __id) as __myfirstfunnel_last_click_adSourceDirty
<...>
FROM target_count
attr_final_table
{{
config(
materialized = 'table',
order_by = ('__datetime')
)
}}
with out as (
select * except(_dbt_source_relation)
from {{ ref('ttr_myfirstfunnel_join_to_attr_prepare_with_qid') }}
join {{ ref(attr_myfirstfunnel_model) }}
using (qid, __datetime, __id, __link, __period_number, __if_missed,
__priority)
)
select * from out
На всех шагах слоя используем следующий тип материализации: materialized = 'table'
.
Статус
Слой полностью автоматизирован.
Макросы
datacraft.attr_prepare_with_qid()
datacraft.attr_create_events()
datacraft.attr_add_row_number()
datacraft.attr_find_new_period()
datacraft.attr_calculate_period_number()
datacraft.attr_create_missed_steps()
datacraft.attr_join_to_attr_prepare_with_qid()
datacraft.attr_model()
datacraft.attr_final_table()
Dataset или датасет - последний шаг обработки данных
На этом этапе формируются датасеты, на основе которых строятся отчёты в BI-системе. В них попадают не все данные, а только необходимые для построения отчёта.
Если никаких дополнительных преобразований не требуется, датасет строится с помощью макроса, который фильтрует таблицу полученную на слое Full
или на Attribution
(если производилась атрибуция) и отбирает только необходимы поля и строки.
Какую таблицу брать за основу (full
или attr
) макрос определяет по пайплайну. Данные из каких источников включать прописывается в конфиге datasets, а данные по каждому источнику содержатся в конфиге datasources. На основе этих данных фильтруется исходная таблица. В финальный датасет попадают только те данные, которые относятся к указанным источнику, аккаунту и шаблону.
В один датасет могут входить несколько таблиц, описанных в datasets. Какие конкретно таблицы входят передаётся в качестве аргумента макроса.
Если же необходимы какие-то специфические преобразования или расчёты, то запрос пишется вручную конкретно под задачу.
Зависит либо от full-таблиц, либо от таблиц слоя Attribution
.
dataset_{название датасета}
Создаётся столько моделей, сколько необходимо.
(без использования макроса)
{{
config(
materialized = 'table',
order_by = ('__datetime')
)
}}
SELECT * FROM (
(
select toLowCardinality('full_datestat') as _dbt_source_relation,
toString("None") as None ,
toDate("__date") as __date ,
toString("reportType") as reportType ,
toString("accountName") as accountName ,
toString("__table_name") as __table_name ,
toString("adSourceDirty") as adSourceDirty ,
<...>
toString('') as __myfirstfunnel_first_click_utmTerm ,
toString('') as __myfirstfunnel_first_click_utmContent ,
toString('') as __myfirstfunnel_first_click_adSourceDirty
from {{ ref('full_datestat') }}
)
union all
(
select toLowCardinality('attr_myfirstfunnel_final_table') as _dbt_source_relation,
toString("None") as None ,
toDate("__date") as __date ,
toString('') as reportType ,
toString("accountName") as accountName ,
toString("__table_name") as __table_name ,
toString("adSourceDirty") as adSourceDirty ,
<...>
toString("__myfirstfunnel_first_click_utmTerm") as __myfirstfunnel_first_click_utmTerm ,
toString("__myfirstfunnel_first_click_utmContent") as __myfirstfunnel_first_click_utmContent ,
toString("__myfirstfunnel_first_click_adSourceDirty") as __myfirstfunnel_first_click_adSourceDirty
from {{ ref('attr_myfirstfunnel_final_table') }}
)
)
WHERE
splitByChar('_', __table_name)[4] = 'yd'
and
splitByChar('_', __table_name)[4] = 'testaccount'
and
splitByChar('_', __table_name)[4] = 'default'
UNION ALL
<...>
Используется materialized = 'table'
.
Статус
Слой полностью автоматизирован.
Макросы
datacraft.create_dataset()
Подробнее про макрос creat_dataset.